Recordings Management (Programmatically)
Here we will show you how can schedule, start and stop Recordings using Python.
Scheduling a Recording
First, we need a graphql client. In this example, we will use the python_graphql_client
library:
pip install python_graphql_client
import os
import sys
from datetime import datetime
from python_graphql_client import GraphqlClient
endpoint = os.environ["HASURA_GRAPHQL_ENDPOINT"]
secret = os.environ["HASURA_GRAPHQL_ADMIN_SECRET"]
client = GraphqlClient(endpoint=endpoint, headers={ "x-hasura-admin-secret": secret })
Next, we can define the following queries for:
- Getting the id of the camera we want to record with (
camera_with_model_and_serial
) - Adding a recording session (
insert_recording_session
) - Adding a record to a session (
insert_record
)- A record defines that a camera should be used to record data for a session
- Adding a stream to a record (
insert_stream
)- A stream defines the kind of data a camera should record. You can have multiple streams per camera as they could record color, depth and infra at the same time
- Setting a sessions state (
update_recording_session_state
)- Allows us to start and stop sessions.
camera_with_model_and_serial_query = """query CameraWithModelAndSerial($model: camera_model_enum!, $serial: String!) {
cameras(where: {model: {_eq: $model}, serial: {_eq: $serial}}) {
id
}
}
"""
def camera_with_model_and_serial(model, serial):
result = client.execute(
camera_with_model_and_serial_query,
{
"model": model,
"serial": serial,
},
)
return result["data"]["cameras"][0]["id"]
insert_recording_session_query = """mutation InsertRecordingSession(
$name: String!,
$startDate: timestamp!,
$endDate: timestamp!,
$mode: RecordingSessionMode_enum!,
$state: RecordingSessionState_enum!
$maxSizeMegabytes: Int!,
$maxTimeSeconds: Int!,
) {
insertRecordingSession(object: {
name: $name,
startDate: $startDate,
endDate: $endDate,
mode: $mode,
state: $state
maxTimeSeconds: $maxTimeSeconds,
maxSizeMegabytes: $maxSizeMegabytes,
storage: { data: {
totalSize: 0,
estimatedSize: 0,
}}
}) {
id
}
}
"""
def insert_recording_session(now, later, name):
return client.execute(
insert_recording_session_query,
{
"name": name,
"startDate": now,
"endDate": later,
"mode": "Storage",
"state": "Stopped",
"maxTimeSeconds": "0",
"maxSizeMegabytes": "0",
},
)["data"]["insertRecordingSession"]["id"]
insert_record_query = """mutation InsertRecord($recordingId: uuid!, $cameraId: uuid!) {
insertRecord(object: {cameraId: $cameraId, recordingId: $recordingId}) {
id
}
}
"""
def insert_record(recording_session_id, camera_id):
return client.execute(
insert_record_query,
{
"recordingId": recording_session_id,
"cameraId": camera_id,
},
)["data"]["insertRecord"]["id"]
insert_stream_query = """mutation InsertStream(
$recordId: uuid!,
$type: stream_type_enum!,
$alignTo: String!,
$encoder: stream_encoder_enum!,
$width: Int!
$height: Int!,
$nearCut: Int!,
$farCut: Int!,
$bitrate: Int!,
$frameRate: Int!,
) {
insertStream(object: {
recordId: $recordId,
type: $type,
alignTo: $alignTo,
encoder: $encoder,
width: $width,
height: $height,
nearCut: $nearCut,
farCut: $farCut,
bitrate: $bitrate
frameRate: $frameRate,
}) {
id
}
}
"""
def insert_stream(
record_id, type, align_to, encoder, width, height, near_cut, far_cut, bitrate, frame_rate
):
return client.execute(
insert_stream_query,
{
"recordId": record_id,
"type": type,
"alignTo": align_to,
"encoder": encoder,
"width": width,
"height": height,
"nearCut": near_cut,
"farCut": far_cut,
"bitrate": bitrate,
"frameRate": frame_rate,
},
)
update_recording_session_state_query = """mutation UpdateRecordingSessionState($id: uuid!, $state: RecordingSessionState_enum!) {
updateRecordingSession(pk_columns: {id: $id}, _set: {state: $state}) {
id
}
}
"""
def update_recording_session_state(session_id, state):
client.execute(update_recording_session_state_query, {"id": session_id, "state": state})
With these functions, we can make small scripts for working with recording sessions. Here is a small script that can add and stop a recording session with one camera and two streams:
# A simple script that allows you to start and stop recording sessions
def print_help():
print(f"To start: {sys.argv[0]} add <camera-serial> <date>")
print(f"To stop: {sys.argv[0]} stop <recording-session-id>")
def main():
if len(sys.argv) < 2:
print("Missing start/stop argument")
print_help()
return 1
if sys.argv[1] == "start":
if len(sys.argv) < 4:
print("Missing camera-serial or date argument")
print_help()
return 1
camera_id = camera_with_model_and_serial("REALSENSE_D435", sys.argv[2])
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
later = sys.argv[3]
recording_session_id = insert_recording_session(now, later, "mysession")
record_id = insert_record(recording_session_id, camera_id)
insert_stream(record_id, "color", "color", "HEVC", "1280", "720", "300", "1000", "1000000", "15")
insert_stream(record_id, "depth", "color", "Classic", "1280", "720", "300", "1000", "5000000", "15")
update_recording_session_state(recording_session_id, "Scheduled")
print(f"Started a Recording Session with start date: {now} and end date {later}.")
print("Note the recording session id to stop the session early:")
print()
print(recording_session_id)
elif sys.argv[1] == "stop":
if len(sys.argv) < 3:
print("Missing recording-session-id argument")
print_help()
return 1
update_recording_session_state(sys.argv[2], "Stopped")
return 0
if __name__ == '__main__':
sys.exit(main())
You can also trigger a session to start and stop recording with low latency. To do this we need to have the session in the Standby
state:
import time
def main():
# Initialize parser
parser = argparse.ArgumentParser()
# Adding optional argument
parser.add_argument("-m", "--Model", help = "Introduce the model of the camera.")
parser.add_argument("-s", "--Serial", help = "Introduce the serial number of the camera.")
parser.add_argument("-r", "--Recording", help = "Introduce the name of the recording session.")
# Read arguments from command line
args = parser.parse_args()
if not args.Model:
sys.exit("You are missing the model of the camera.")
if not args.Serial:
sys.exit("You are missing the serial number of the camera.")
if not args.Recording:
sys.exit("You are missing the name of the recording session.")
camera_id = camera_with_model_and_serial(args.Model, args.Serial)
now = datetime.min.strftime("%Y-%m-%dT%H:%M:%S")
later = datetime.max.strftime("%Y-%m-%dT%H:%M:%S")
recording_session_id = insert_recording_session(now, later, args.Recording)
record_id = insert_record(recording_session_id, camera_id)
insert_stream(record_id, "color", "color", "HEVC", 1280, 720, 300, 1000, 1000000, 30)
insert_stream(record_id, "depth", "color", "Classic", 1280, 720, 300, 1000, 5000000, 30)
update_recording_session_state(recording_session_id, "Standby")
# Wait 5 seconds and record for 10. Do so 5 times.
for i in range(5):
sleep(5)
update_recording_session_state(recording_session_id, "Started")
sleep(10)
update_recording_session_state(recording_session_id, "Standby")
update_recording_session_state(recording_session_id, "Stopped")
if __name__ == '__main__':
sys.exit(main())
You can find the full script in the aivero-python-interface under the name standby-mode-trigger-every-10-seconds.py
.
The values which are allowed for the Model
argument are:
- REALSENSE (This is for the D405 camera)
- REALSENSE_D415
- REALSENSE_D435
- REALSENSE_D455
- V4L2CAMERA (This is for RaspberryPi cameras)